本项目中,将对数据集中的猫狗进行分类。 数据源来自于kaggle比赛“Dogs vs. Cats Redux: Kernels Edition”。 数据集链接:https://www.kaggle.com/c/dogs-vs-cats-redux-kernels-edition/data 将对数据集进行预处理(剔除异常值、统一图片size等),然后利用keras提供的深度神经网络学习架构对猫狗数据集进行建模(二分类模型)。最后,利用优化后的模型在测试集上进行预测,并将预测结构提交到kaggle,目标达到kaggle排名的前10%。
数据集中图片大小是否一致?图片标签有哪些?这些标签是顺序排列还是随机排列?是否存在异常值(标签与图片内容不符)?探索数据集将有利于预处理数据,并是预测结果更准确。
将train目录下训练集数据和test目录下测试集数据读入列表中(文件路径)
import matplotlib.pyplot as plt
import numpy as np
import math
import os
import shutil
import cv2
import sys
%matplotlib inline
train_image_path="./train/"
test_image_path="./test/"
train_image_list=[]
test_image_list=[]
rows = 299
cols = 299
# #解压图片
# if os.path.exists("./train/"):
# shutil.rmtree("./train/")
# os.system('unzip train.zip')
# if os.path.exists("./test/"):
# shutil.rmtree("./test/")
# os.system('unzip test.zip')
#从目录中所有文件读入到列表中
def get_image_list(path_name, list_name):
for file_name in os.listdir(path_name):
list_name.append(os.path.join(path_name, file_name))
get_image_list(train_image_path, train_image_list)
get_image_list(test_image_path, test_image_list)
print("train image sample:{}\ntest image sample:{}".format(len(train_image_list),len(test_image_list)))
实现显示图片函数,并从训练集和测试集中各随机挑选10张图片显示
def display_img(img_list, summary = True):
fig = plt.figure(figsize=(15, 3 * math.ceil(len(img_list)/5)))
for i in range(0, len(img_list)):
img = cv2.imread(img_list[i])
img = img[:,:,::-1]#BGR->RGB
if summary:
print("---->image: {} - shape: {}".format(img_list[i], img.shape))
ax = fig.add_subplot(math.ceil(len(img_list)/5),5,i+1)
ax.set_title(os.path.basename(img_list[i]))
ax.set_xticks([])
ax.set_yticks([])
img = cv2.resize(img, (128,128))
ax.imshow(img)
plt.show()
import random
random.seed(2018)
display_img(random.sample(train_image_list, 10))
display_img(random.sample(test_image_list, 10))
下面我们绘制训练集图片size分布情况。
#实现获取图片像素函数
def get_pic_size_distribution(img_list):
x_PX= np.zeros(25000)
y_PX= np.zeros(25000)
for i,item in enumerate(img_list):
img = cv2.imread(item)
x_PX[i]=img.shape[0]
y_PX[i]=img.shape[1]
return x_PX, y_PX
#实现展示图片size分布图函数
def show_pic_size_distribution( x_PX, y_PX ):
plt.figure(figsize=(15,15))
#设置lable,颜色
plt.scatter(x_PX, y_PX, c='blue', label='px')
#设置标题
plt.title('pic_size_distribution')
#设置坐标轴lable
plt.xlabel('x_px')
plt.ylabel('y_px')
#设置legend
plt.legend(loc=2)
plt.show()
#展示训练集图片size分布
x_PX, y_PX = get_pic_size_distribution(train_image_list)
show_pic_size_distribution( x_PX, y_PX )
我们看到,几乎所有图片大小都集中在左下部分,有两个图片的size分布在右上角,那么这两个尺寸异常值是否就是图片异常值呢? 展示2张size异常的图片
abnormal=[]
for i in range(25000):
if y_PX[i]>800:
abnormal.append(train_image_list[i])
display_img(abnormal)
我们看到,这两张尺寸异常的图片同样是正常的猫狗图片。那么我们如何定义异常值?ImageNet数据集中包含有猫狗的具体分类,对一个图片在载有ImageNet上预训练权值的xception模型上进行预测,如果其预测结果top50不包含猫狗真实的标签分类(图片预测值前50都没有正常分类),那么就将其视为异常值
经过初步探索数据集发现:图片大小不一致;图片命名有dog、cat;标签(图片命名)是按顺序排列(cat排在前边、dog排在后边);图片存在异常值(非猫非狗)。在预处理部分,我们将resize统一图片大小,剔除异常值,打乱样本顺序,生成标签(dog:1;cat:0)。
利用Xception网络对训练集进行imagenet分类,如果topN不含有‘dog’‘cat’视为异常值
from keras.applications import *
model_pre=xception.Xception(weights='imagenet')
ImageNet 1000个类具体内容,参考文献 https://blog.csdn.net/zhangjunbob/article/details/53258524
Dogs = [ 'n02085620','n02085782','n02085936','n02086079','n02086240','n02086646','n02086910','n02087046','n02087394','n02088094','n02088238',
'n02088364','n02088466','n02088632','n02089078','n02089867','n02089973','n02090379','n02090622','n02090721','n02091032','n02091134',
'n02091244','n02091467','n02091635','n02091831','n02092002','n02092339','n02093256','n02093428','n02093647','n02093754','n02093859',
'n02093991','n02094114','n02094258','n02094433','n02095314','n02095570','n02095889','n02096051','n02096177','n02096294','n02096437',
'n02096585','n02097047','n02097130','n02097209','n02097298','n02097474','n02097658','n02098105','n02098286','n02098413','n02099267',
'n02099429','n02099601','n02099712','n02099849','n02100236','n02100583','n02100735','n02100877','n02101006','n02101388','n02101556',
'n02102040','n02102177','n02102318','n02102480','n02102973','n02104029','n02104365','n02105056','n02105162','n02105251','n02105412',
'n02105505','n02105641','n02105855','n02106030','n02106166','n02106382','n02106550','n02106662','n02107142','n02107312','n02107574',
'n02107683','n02107908','n02108000','n02108089','n02108422','n02108551','n02108915','n02109047','n02109525','n02109961','n02110063',
'n02110185','n02110341','n02110627','n02110806','n02110958','n02111129','n02111277','n02111500','n02111889','n02112018','n02112137',
'n02112350','n02112706','n02113023','n02113186','n02113624','n02113712','n02113799','n02113978']
Cats=['n02123045','n02123159','n02123394','n02123597','n02124075','n02125311','n02127052']
def batch_img(img_path_list, batch_size):
'''split img_path_list into batches'''
for begin in range(0, len(img_path_list), batch_size):
end = min(begin+batch_size, len(img_path_list))
yield img_path_list[begin:end]
def read_batch_img(batch_imgpath_list):
'''read batch img and resize'''
images = np.zeros((len(batch_imgpath_list), 299, 299, 3), dtype=np.uint8)
for i in range(len(batch_imgpath_list)):
img = cv2.imread(batch_imgpath_list[i])
img = img[:,:,::-1]
img = cv2.resize(img, (299,299))
images[i] = img
return images
def pred_pet(model, img_path_list, top_num, preprocess_input, decode_predictions, batch_size = 32):
'''predict img
#returns
the list, will show pet or not
'''
ret = []
for batch_imgpath_list in batch_img(img_path_list, batch_size):
X = read_batch_img(batch_imgpath_list)
X = preprocess_input(X)
preds = model.predict(X)
dps = decode_predictions(preds, top = top_num)
for index in range(len(dps)):
for i, val in enumerate(dps[index]):
if (val[0] in Dogs) and ('dog' in batch_imgpath_list[index]):
ret.append(True)
break
elif (val[0] in Cats) and ('cat' in batch_imgpath_list[index]):
ret.append(True)
break
if i==len(dps[index])-1:
ret.append(False)
return ret
def get_abnormal_v(train_image_list, topN = 50):
abnormal_v = []
if os.path.exists("./abnormal.txt"):
with open("./abnormal.txt", 'r') as f:
items = f.readlines()
abnormal_v = [item.strip('\n') for item in items]
else:
ret =[]
ret = pred_pet(model_pre, train_image_list, topN, xception.preprocess_input, xception.decode_predictions)
for i,val in enumerate(ret):
if not val:
abnormal_v.append(train_image_list[i])
with open("./abnormal.txt", 'w') as f:
for item in abnormal_v:
f.write("{}\n".format(item))
return abnormal_v
abnormal_v = get_abnormal_v(train_image_list, topN=50)
display_img(abnormal_v, summary = False)
train_image_list = [item for item in train_image_list if item not in abnormal_v]
for i in abnormal_v:
os.remove(i)
探索数据集时发现,样本是顺序排列的(猫在前、狗在后),因此需要对数据集进行随机化处理。
import random
random.shuffle(train_image_list)
将图片读入内存,并生成对应的label,并将训练集图片分成5份,1份作为验证集,其余4份作为训练集。
X = read_batch_img(train_image_list)
#生成对应label
def get_lables(image_list):
labels = np.zeros(len(image_list), dtype=np.uint8)
for i,item in enumerate(image_list):
if "dog" in item:
labels[i] = 1
else:
labels[i] = 0
return labels
Y = get_lables(train_image_list)
#划分成训练集和验证集
val_X = X[:math.ceil(len(train_image_list)/5)]
val_Y = Y[:math.ceil(len(train_image_list)/5)]
train_X = X[math.ceil(len(train_image_list)/5):]
train_Y = Y[math.ceil(len(train_image_list)/5):]
base model选用Xception,不包含top的分类器,随后在base model后边加一个二分类分类器
# create the base pre-trained model
base_model = xception.Xception(weights='imagenet', input_shape = (299,299,3), include_top=False, pooling='avg')
x = base_model.output
from keras.models import Model
from keras.layers import Dense
# 二分类分类器
predictions = Dense(1, activation='sigmoid')(x)
# this is the model we will train
model = Model(inputs=base_model.input, outputs=predictions)
model.summary()
#打印层数
for i, layer in enumerate(base_model.layers):
print(i, layer.name)
from keras.utils import plot_model
plot_model(model, to_file='model.png')
from IPython.display import SVG
from keras.utils.vis_utils import model_to_dot
SVG(model_to_dot(model).create(prog='dot', format='svg'))
我们会在xception基础上进行fine_tune。
history用于记录训练过程中train_loss和val_loss,earlyStopping用于当检测值不在改善时终止训练
from keras import callbacks
class LossHistory(callbacks.Callback):
def __init__(self):
self.epoch = 0
def on_train_begin(self, logs={}):
self.losses = []
self.val_losses = []
self.acc = []
self.val_acc = []
self.epoch = 0
def on_epoch_begin(self, batch, logs={}):
self.epoch += 1
def on_epoch_end(self, batch, logs={}):
self.losses.append(logs.get('loss'))
self.val_losses.append(logs.get('val_loss'))
self.acc.append(logs.get('acc'))
self.val_acc.append(logs.get('val_acc'))
model.save('xception_fine-tune_epoch{}.h5'.format(self.epoch))
history = LossHistory()
earlyStopping = callbacks.EarlyStopping(monitor='val_loss', patience=3, verbose=0, mode='auto')
def show_loss_and_acc(title_name):
loss = history.losses
val_loss = history.val_losses
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss Trend: '+ title_name)
plt.plot(loss, 'blue', label='Training Loss')
plt.plot(val_loss, 'green', label='Validation Loss')
plt.xticks(range(0, history.epoch),range(0, history.epoch+1))
plt.legend()
plt.show()
acc = history.acc
val_acc = history.val_acc
plt.xlabel('Epochs')
plt.ylabel('acc')
plt.title('acc Trend: ' + title_name)
plt.plot(acc, 'blue', label='Training acc')
plt.plot(val_acc, 'green', label='Validation acc')
plt.xticks(range(0, history.epoch),range(0, history.epoch+1))
plt.legend()
plt.show()
epochs = 30
batch_size = 32
nb_validation_samples = math.ceil(len(train_image_list)/ 5)
nb_train_samples = 4 * nb_validation_samples
#冻结Xception所有层,值训练top layers
for layer in base_model.layers:
layer.trainable = False
#编译模型
from keras.optimizers import Adam
model.compile(optimizer=Adam(lr=0.0001), loss='binary_crossentropy', metrics=['accuracy'])
#图片数据增强
from keras.preprocessing.image import ImageDataGenerator
#训练数据增强
train_datagen = ImageDataGenerator( preprocessing_function=xception.preprocess_input,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True)
#验证数据增强
validation_datagen = ImageDataGenerator(preprocessing_function=xception.preprocess_input)
train_generator = train_datagen.flow(x = train_X,
y = train_Y,
batch_size = batch_size,
shuffle=True)
validation_generator = validation_datagen.flow( x = val_X,
y = val_Y,
batch_size = batch_size,
shuffle=False)
#训练模型
model.fit_generator(train_generator,
steps_per_epoch=math.ceil(nb_train_samples/batch_size),
epochs=epochs,
validation_data=validation_generator,
validation_steps=math.ceil(nb_validation_samples/batch_size),
callbacks=[history, earlyStopping])
show_loss_and_acc("adam_train_cla")
由以上log信息可以看出,当只训练顶部的分类器时,第8代结束后val_loss最低,且经过3个epoch都没有比第8代更低,提前结束了训练。下面我们将加载第8个epoch结束后的model继续训练。
from keras.models import load_model
del model
model = load_model("xception_fine-tune_epoch8.h5")
#这次冻结前12个block,也就是要锁住前面的115层,然后放开后边2个block的所有层
for layer in model.layers[:116]:
layer.trainable = False
for layer in model.layers[116:]:
layer.trainable = True
#重新编译,模型
# model.compile(optimizer=SGD(lr=0.0001, momentum=0.9), loss='binary_crossentropy', metrics=['accuracy'])
model.compile(optimizer=Adam(lr=0.0001), loss='binary_crossentropy', metrics=['accuracy'])
#训练模型
model.fit_generator(train_generator,
steps_per_epoch=math.ceil(nb_train_samples/batch_size),
epochs=epochs,
validation_data=validation_generator,
validation_steps=math.ceil(nb_validation_samples/batch_size),
callbacks=[history, earlyStopping])
show_loss_and_acc("adam_train_top2")
由以上log信息可以看出,当放开顶端2个block时,第2代结束后val_loss最低,且经过3个epoch都没有比第2代更低,提前结束了训练。下面我们将加载第2个epoch结束后的model继续训练。
del model
model = load_model("xception_fine-tune_epoch2.h5")
#这次冻结前10个block,也就是要锁住前面的95层,然后放开后边4个block的所有层
for layer in model.layers[:96]:
layer.trainable = False
for layer in model.layers[96:]:
layer.trainable = True
#编译模型
model.compile(optimizer=Adam(lr=0.0001), loss='binary_crossentropy', metrics=['accuracy'])
#训练模型
model.fit_generator(train_generator,
steps_per_epoch=math.ceil(nb_train_samples/batch_size),
epochs=epochs,
validation_data=validation_generator,
validation_steps=math.ceil(nb_validation_samples/batch_size),
callbacks=[history, earlyStopping])
show_loss_and_acc("adam_train_top4")
由以上log信息可以看出,当放开顶端4个block时,第2代结束后val_loss最低,且经过3个epoch都没有比第2代更低,提前结束了训练。
#冻结Xception所有层,值训练top layers
for layer in base_model.layers:
layer.trainable = False
#编译模型
from keras.optimizers import Adam
model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])
#图片数据增强
from keras.preprocessing.image import ImageDataGenerator
#训练数据增强
train_datagen = ImageDataGenerator( preprocessing_function=xception.preprocess_input,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True)
#验证数据增强
validation_datagen = ImageDataGenerator(preprocessing_function=xception.preprocess_input)
train_generator = train_datagen.flow(x = train_X,
y = train_Y,
batch_size = batch_size,
shuffle=True)
validation_generator = validation_datagen.flow( x = val_X,
y = val_Y,
batch_size = batch_size,
shuffle=False)
#训练模型
model.fit_generator(train_generator,
steps_per_epoch=math.ceil(nb_train_samples/batch_size),
epochs=epochs,
validation_data=validation_generator,
validation_steps=math.ceil(nb_validation_samples/batch_size),
callbacks=[history, earlyStopping])
show_loss_and_acc("fine-tune-topClassifier")
由以上log信息可以看出,当只训练顶部的分类器时,第七代结束后val_loss最低,且经过3个epoch都没有比第七代更低,提前结束了训练。下面我们将加载第7个epoch结束后的model继续训练。
from keras.models import load_model
del model
model = load_model("xception_fine-tune_epoch7.h5")
model.save("xception_fine-tune_topCla.h5")
#这次冻结前12个block,也就是要锁住前面的115层,然后放开后边2个block的所有层
for layer in model.layers[:116]:
layer.trainable = False
for layer in model.layers[116:]:
layer.trainable = True
#重新编译,模型
model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])
#训练模型
model.fit_generator(train_generator,
steps_per_epoch=math.ceil(nb_train_samples/batch_size),
epochs=epochs,
validation_data=validation_generator,
validation_steps=math.ceil(nb_validation_samples/batch_size),
callbacks=[history, earlyStopping])
show_loss_and_acc('xception_fine-tune_1314block')
由以上log信息可以看出,当放开model后边2个block的时,第8代结束后val_loss最低。下面我们将加载第8个epoch结束后的model继续训练。
del model
model = load_model("xception_fine-tune_epoch8.h5")
model.save('xception_fine-tune_1314block.h5')
#这次冻结前10个block,也就是要锁住前面的95层,然后放开后边4个block的所有层
for layer in model.layers[:96]:
layer.trainable = False
for layer in model.layers[96:]:
layer.trainable = True
#重新编译,模型
model.compile(optimizer=Adam(lr=0.0001), loss='binary_crossentropy', metrics=['accuracy'])
#训练模型
model.fit_generator(train_generator,
steps_per_epoch=math.ceil(nb_train_samples/batch_size),
epochs=epochs,
validation_data=validation_generator,
validation_steps=math.ceil(nb_validation_samples/batch_size),
callbacks=[history, earlyStopping])
由以上log信息可以看出,当放开model后边4个block的时,第2代结束后val_loss最低,已经达到了0.0100。将其对应的模型
del model
model = load_model("xception_fine-tune_epoch2.h5")
model.save('xception_fine-tune_11-14block.h5')
show_loss_and_acc("xception_fine-tune_11-14block")
利用此模型对test测试集进行预测,将结果提交kaggle得到了0.04734的成绩。但验证集loss呈现出锯齿状.
#这次冻结前10个block,也就是要锁住前面的95层,然后放开后边4个block的所有层
for layer in model.layers[:96]:
layer.trainable = False
for layer in model.layers[96:]:
layer.trainable = True
#编译模型
from keras.optimizers import Adam
model.compile(optimizer=Adam(lr=0.0001), loss='binary_crossentropy', metrics=['accuracy'])
#图片数据增强
from keras.preprocessing.image import ImageDataGenerator
#训练数据增强
train_datagen = ImageDataGenerator( preprocessing_function=xception.preprocess_input,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True)
#验证数据增强
validation_datagen = ImageDataGenerator(preprocessing_function=xception.preprocess_input)
train_generator = train_datagen.flow(x = train_X,
y = train_Y,
batch_size = batch_size,
shuffle=True)
validation_generator = validation_datagen.flow( x = val_X,
y = val_Y,
batch_size = batch_size,
shuffle=False)
#训练模型
model.fit_generator(train_generator,
steps_per_epoch=math.ceil(nb_train_samples/batch_size),
epochs=epochs,
validation_data=validation_generator,
validation_steps=math.ceil(nb_validation_samples/batch_size),
callbacks=[history, earlyStopping])
from keras.models import load_model
del model
model = load_model("xception_fine-tune_epoch5.h5")
model.save("xception_fine-tune_top4block.h5")
show_loss_and_acc("fine-tune-top4block")
利用此模型对test测试集进行预测,将结果提交kaggle得到了0.04833的成绩。
#这次冻结前10个block,也就是要锁住前面的95层,然后放开后边4个block的所有层
for layer in model.layers[:96]:
layer.trainable = False
for layer in model.layers[96:]:
layer.trainable = True
#编译模型
from keras.optimizers import SGD
model.compile(optimizer=SGD(lr=0.0001, momentum=0.9), loss='binary_crossentropy', metrics=['accuracy'])
#图片数据增强
from keras.preprocessing.image import ImageDataGenerator
#训练数据增强
train_datagen = ImageDataGenerator( preprocessing_function=xception.preprocess_input,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True)
#验证数据增强
validation_datagen = ImageDataGenerator(preprocessing_function=xception.preprocess_input)
train_generator = train_datagen.flow(x = train_X,
y = train_Y,
batch_size = batch_size,
shuffle=True)
validation_generator = validation_datagen.flow( x = val_X,
y = val_Y,
batch_size = batch_size,
shuffle=False)
#训练模型
model.fit_generator(train_generator,
steps_per_epoch=math.ceil(nb_train_samples/batch_size),
epochs=epochs,
validation_data=validation_generator,
validation_steps=math.ceil(nb_validation_samples/batch_size),
callbacks=[history, earlyStopping])
show_loss_and_acc("fine-tune-top4block_SGD")
model.save("fine-tune-top4block_SGD.h5")
利用此模型对test测试集进行预测,将结果提交kaggle得到了0.03974的成绩
利用训练好的模型对测试集进行预测,并将测试结果写入csv文件中,将csv文件上传kaggle验证最终结果。
import pandas as pd
from keras.applications import *
df = pd.read_csv("sample_submission.csv")
model_list = ["xception_fine-tune_11-14block.h5","xception_fine-tune_top4block.h5", "fine-tune-top4block_SGD.h5"]
from keras.models import load_model
for model_name in model_list:
del model
model = load_model(model_name)
for batch_imgpath_list in batch_img(test_image_list, batch_size):
X = read_batch_img(batch_imgpath_list)
X = xception.preprocess_input(X)
pred_y = model.predict_on_batch(X)
# pred_y = pred_y.clip(min=0.001, max=0.999)
pred_y = pred_y.clip(min=0.005, max=0.995)
# pred_y = pred_y.clip(min=0.0005, max=0.9995)
# pred_y = pred_y.clip(min=0.0001, max=0.9999)
for i, filename in enumerate(batch_imgpath_list):
index = int(filename.split('/')[-1].split('.')[0])
df.at[index-1, 'label'] = pred_y[i]
df.to_csv('pred_'+ (model_name.split('.')[0] +'_Clip999.csv'), index=None)
del model
model = load_model("fine-tune-top4block_SGD.h5")
def display_pred_img(img_list):
fig = plt.figure(figsize=(15, 3 * math.ceil(len(img_list)/5)))
X = read_batch_img(img_list)
img = X
X = xception.preprocess_input(X)
pred_y = model.predict_on_batch(X)
for i in range(0, len(img_list)):
ax = fig.add_subplot(math.ceil(len(img_list)/5),5,i+1)
if pred_y[i] >= 0.5:
ax.set_title("{} : {:.2f}% is dog".format(os.path.basename(img_list[i]), pred_y[i][0] * 100))
else:
ax.set_title("{} : {:.2f}% is cat".format(os.path.basename(img_list[i]), (1-pred_y[i][0])*100))
ax.set_xticks([])
ax.set_yticks([])
ax.imshow(img[i])
plt.show()
dis_pred_img_list = test_image_list[:10] + test_image_list[-10:]
display_pred_img(dis_pred_img_list)
del model
model = load_model("fine-tune-top4block_SGD.h5")
robustness_image_path="./robustness/"
robustness_image_list=[]
get_image_list(robustness_image_path, robustness_image_list)
display_pred_img(robustness_image_list)